【AWS CDK】Step Functions内のLambdaをGlue (Python Shell)に置き換えてみた

【AWS CDK】Step Functions内のLambdaをGlue (Python Shell)に置き換えてみた

Clock Icon2024.07.24

データアナリティクス事業本部のueharaです。

今回は、Step Functions内のLambdaをGlue (Python Shell)にAWS CDKを利用して置き換えてみたいと思います。

はじめに

Lambdaには最大で15分という実行時間の制限があります。

したがって、最初はLambdaで行っていた処理も処理が肥大化してくると「実行時間制限に収まらない!」といったケースが出てきます。

今回はStep Functions内で利用されているLambdaをGlueに置き換える処理をAWS CDKを用いてやってみたいと思います。

イメージ図

2024-723_cdk_blog_01

以下で説明する全体のコードはGitHubにアップロードしています。

移行に際しての観点

Step Functions内のLambdaについて、Glue (Python Shell)への置き換えでは以下の事項を考慮する必要があります。

  • Lambda Layerでパッケージしていたものはどうするか?
  • Lambdaのソースとしてアップロードしていた共通のpyモジュールのディレクトリはどうするか?
  • lambda_handlerevent として受け取っていたステートマシンの入力をどうするか?
  • ステートマシンへの出力として lambda_handler の返り値として渡していたものをどうするか?
  • Lambdaの環境変数と設定していたものはどうするか?

以上を踏まえ、元のLambdaのスクリプトの修正を最小限にしつつ、Glue (Python Shell)へ移行してみたいと思います。

移行前の構成を作成(Lambda)

まず、AWS CDKで移行前の構成を作成します。

プロジェクトディレクトリは以下の通りです。
node_modules, cdk.out ディレクトリについては省略しています。

.
├── bin
│   └── cdk-my-sample-app.ts
├── lib
│   ├── cdk-my-sample-app-stack.ts
│   ├── iam_role.ts
│   ├── lambda.ts
│   └── stepfunctions_with_lambda.ts
├── resources
│   ├── lambda
│   │   ├── common
│   │   │   ├── __init__.py
│   │   │   └── my_utils.py
│   │   ├── end_handler.py
│   │   └── start_handler.py
│   └── lambda_layer
│       └── requirements.txt
├── test
│   └── cdk-sample-app.test.ts
├── README.md
├── cdk.json
├── jest.config.js
├── package-lock.json
├── package.json
└── tsconfig.json

以下、主要なファイルのみ説明を行います。

主要ファイルの説明

lib/lambda.ts

lambda.ts は以下です。

lambda.ts
import { Duration } from 'aws-cdk-lib';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { Construct } from 'constructs';

type dict = { [key: string]: any; }

export function createLambdaFunction(scope: Construct, id: string, role: iam.Role, layers: lambda.ILayerVersion[], environment: dict = {}): lambda.Function {

    const lambdaSourceName = id.replace(/-/g, '_')

    return new lambda.Function(scope, id, {
        functionName: `${id}-test`,
        runtime: lambda.Runtime.PYTHON_3_9,
        handler: `${lambdaSourceName}.lambda_handler`,
        code: lambda.Code.fromAsset('resources/lambda'),
        memorySize: 128,
        timeout: Duration.seconds(900),
        role: role,
        layers: layers,
        environment: environment,
    });
}

使用するhandlerやLambda Layer、環境変数の値を受け取り、 aws_lambda.Function クラスを返す関数を定義しています。

Lambda関数にデプロイする資材は resources/lambda 配下のものを指定しています。したがって、 my_utils.py を含む common ディレクトリと、各種handlerの .py ファイルがアップロードされます。

lib/stepfunctions_with_lambda.ts

stepfunctions_with_lambda.ts は以下です。

tepfunctions_with_lambda.ts
import * as cdk from 'aws-cdk-lib';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import { Construct } from 'constructs';

export function createStepFunctions(
    scope: Construct,
    id: string,
    stateMachineRole: iam.Role,
    startLambda: cdk.aws_lambda.Function,
    endLambda: cdk.aws_lambda.Function,
): sfn.StateMachine {
    const retry: sfn.RetryProps = {
        errors: ['States.ALL'],
        maxAttempts: 2,
    }

    // Lambda タスク
    const startTask = new tasks.LambdaInvoke(scope, `${id}_start_task`, {
        lambdaFunction: startLambda,
        outputPath: '$.Payload',
    }).addRetry(retry)

    const endTask = new tasks.LambdaInvoke(scope, `${id}_end_task`, {
        lambdaFunction: endLambda,
        outputPath: '$.Payload',
    }).addRetry(retry);

    // ステートマシンの定義
    const definition = startTask.next(endTask);

    return new sfn.StateMachine(scope, id, {
        stateMachineName: `${id}`,
        definitionBody: sfn.DefinitionBody.fromChainable(definition),
        timeout: cdk.Duration.hours(1),
        role: stateMachineRole,
    });
}

2つの aws_lambda.Function クラスを受け取り、連結しています。(冒頭で示したイメージ図の通りです)

lib/cdk-my-sample-app-stack.ts

cdk-my-sample-app-stack.ts は以下です。

cdk-my-sample-app-stack.ts
import * as lambdaPython from '@aws-cdk/aws-lambda-python-alpha';
import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { Construct } from 'constructs';
import { createLambdaRole, createStepFunctionsRole } from './iam_role';
import { createLambdaFunction } from './lambda';
import { createStepFunctions } from './stepfunctions_with_lambda';

export class CdkMySampleAppStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // LambdaLayerの作成
    const baseLayer = new lambdaPython.PythonLayerVersion(this, 'BaseLayer', {
      entry: 'resources/lambda_layer',
      compatibleRuntimes: [lambda.Runtime.PYTHON_3_9],
      description: 'The layer containing Python dependencies',
    });
    const layers = [baseLayer]

    // Lambda IAM Role
    const lambdaRole = createLambdaRole(this, 'my-lambda-role');

    // 環境変数の定義
    const env_vars = {
      SAVE_BUCKET_NAME: 'uehara-test-bucket',
    }

    // Lambda Function
    const startLambda = createLambdaFunction(this, 'start-handler', lambdaRole, layers, env_vars);
    const endLambda = createLambdaFunction(this, 'end-handler', lambdaRole, layers, {})

    // Step Functions IAM Role
    const stepFunctionsRole = createStepFunctionsRole(this, 'my-sfn-role');

    // Step Functions with Lambda
    const stepFunctions = createStepFunctions(this, 'my-sfn-with-lambda', stepFunctionsRole, startLambda, endLambda);
  }
}

基本的にはコメントに記載している通りに定義をしています。

Lambda Layerについては resources/lambda_layer 配下にある requirements.txt を参照して作成しています。

resources/lambda 配下のファイル

resources/lambda 配下のファイルを以下にまとめて記載します。

requirements.txt
pytz==2024.1

requirements.txtpytz モジュールのみ記載しています。

my_utils.py
def get_hello_str(name):
    return f"Hello! {name}"

サンプルとして、 my_utils.py には名前を引数として受け取ると Hello! <名前> という文字列を返す関数を記載します。

start_handler.py
import json
import os

from common import my_utils

SAVE_BUCKET_NAME = os.getenv("SAVE_BUCKET_NAME", "")

def lambda_handler(event, context):
    print(f"event: {json.dumps(event)}")
    name = event.get("name", "")

    hello_str = my_utils.get_hello_str(name)
    print(f"{hello_str}")

    s3_uri = f"s3://{SAVE_BUCKET_NAME}/test"

    return {
        "result": "success",
        "s3_uri": s3_uri,
    }

この start_handler.py が今回Glueへ移行する対象のスクリプトになります。

大したことはしていませんが、

  • Step Functionsの入力パラメータの利用
  • 共通モジュールの利用
  • 環境変数の利用
  • Step Functionsの出力パラメータの利用

を行うように構成しています。

後続の end_handler.py は前段の出力を表示するだけで、特に処理はしていません。

end_handler.py
import json

def lambda_handler(event, context):
    print(f"event: {json.dumps(event)}")

    return {
        "result": "success"
    }

デプロイしてみる

必要モジュールをインストールし、デプロイを行います。

# 必要モジュールのインストール
$ npm i -D @aws-cdk/aws-lambda-python-alpha@2.139.0-alpha.0

# デプロイ
$ cdk deploy CdkMySampleAppStack --profile <AWSプロファイル名>

Step Functionsの実行

デプロイが完了すると以下のようなStep Functionsが作成されます。

2024-723_cdk_blog_02

作成されたStep Functionsに次のようなパラメータを渡し実行してみます。

{
  "name": "uehara"
}

最初のステートの出力は以下の通りです。

2024-723_cdk_blog_03

Lambda関数の返り値として設定した、 results3_uri がステートの出力パラメータになっていることが分かります。

ステートの出力
{
  "result": "success",
  "s3_uri": "s3://uehara-test-bucket/test"
}

ログにも、きちんと期待された値が出力されていることが確認できました。

2024-723_cdk_blog_04

それでは、上記をGlueに置き換えたいと思います。

移行後の構成を作成(Glue)

早速ですが、プロジェクトディレクトリは以下の通りです。
node_modules, cdk.out ディレクトリについては省略しています。

$ tree -I "node_modules|cdk.out" --dirsfirst        
.
├── bin
│   └── cdk-my-sample-app.ts
├── lib
│   ├── cdk-my-sample-app-stack.ts
│   ├── glue.ts ★
│   ├── glue_scripts.ts ★ 
│   ├── iam_role.ts
│   ├── lambda.ts
│   ├── stepfunctions_with_glue.ts ★
│   └── stepfunctions_with_lambda.ts
├── resources
│   ├── glue ★
│   │   ├── common
│   │   │   ├── __init__.py
│   │   │   └── my_utils.py
│   │   ├── job_scripts
│   │   │   └── start_glue.py
│   │   └── pyproject.toml
│   ├── lambda
│   │   ├── common
│   │   │   ├── __init__.py
│   │   │   └── my_utils.py
│   │   ├── end_handler.py
│   │   └── start_handler.py
│   └── lambda_layer
│       └── requirements.txt
├── test
│   └── cdk-sample-app.test.ts
├── README.md
├── cdk.json
├── jest.config.js
├── package-lock.json
├── package.json
└── tsconfig.json

「★」印がついているファイル/ディレクトリが今回新規に作成したものになります。

以下、主要なファイルのみ説明を行います。

主要ファイルの説明

lib/glue_scripts.ts

glue_scripts.ts はGlueのスクリプトをデプロイするための記述になります。

Lambdaでは先に示した通り common ディレクトリやhandlerファイルがあるディレクトリを aws_lambda.Function で直接指定していましたが、GlueはS3への配置を自分で行わなければなりません。

glue_scripts.ts
import * as cdk from 'aws-cdk-lib';
import * as s3 from 'aws-cdk-lib/aws-s3';
import { Construct } from 'constructs';

export function deployGlueJobScript(scope: Construct, id: string) {
    const destinationBucket = s3.Bucket.fromBucketName(scope, `${id}-bucket`, 'cm-da-uehara');

    // GlueスクリプトをS3バケットにアップロード
    new cdk.aws_s3_deployment.BucketDeployment(scope, id, {
        sources: [cdk.aws_s3_deployment.Source.asset("resources/glue/job_scripts")],
        destinationBucket:  destinationBucket,
        destinationKeyPrefix: 'glue-scripts/',
    });
}

export function deployGlueCommonScript(scope: Construct, id: string) {
    const destinationBucket = s3.Bucket.fromBucketName(scope, `${id}-bucket`, 'cm-da-uehara');

    // commonディレクトリをwhlファイルに変換してS3にアップロード
    new cdk.aws_s3_deployment.BucketDeployment(scope, id, {
        sources: [
          cdk.aws_s3_deployment.Source.asset(
            'resources/glue',
            {
              bundling: {
                image: cdk.DockerImage.fromRegistry('python:3.9'),
                command: [
                  'bash',
                  '-c',
                  'pip install --user --upgrade pip && ' +
                    'pip install --user --no-cache-dir build wheel && ' +
                    'python -m build --wheel && ' +
                    'cp dist/*.whl /asset-output/common-0.1-py3-none-any.whl && ' +
                    'rm -rf dist build *.egg-info',
                ],
                user: 'root',
              },
            }
          ),
        ],
        destinationBucket: destinationBucket,
        destinationKeyPrefix: 'glue-scripts-common/',
    });
}

上記の通り、 Glueのスクリプト自体は .py ファイルを直接S3にアップロードしてますが、共通モジュール群を格納する common ディレクトリについてはwheel化を行っています。

Glue実行時にはこのwheel化したモジュールを追加ファイルとして指定することで、実行スクリプトから呼び出しを行うことができます。

lib/glue.ts

glue.ts はGlue Jobそのものを記載するファイルになります。

glue.ts 
import * as iam from 'aws-cdk-lib/aws-iam';
import * as cdk from 'aws-cdk-lib';
import * as glue from 'aws-cdk-lib/aws-glue';
import { Construct } from 'constructs';

type dict = { [key: string]: any; }

export function createGlueJob(scope: Construct, id: string, role: iam.Role, environment: dict = {}): glue.CfnJob {

    const glueSourceName = id.replace(/-/g, '_')

    // Glue Jobの定義
    const glueJob = new glue.CfnJob(scope, id, {
        name: `${id}`,
        role: role.roleArn,
        command: {
            name: 'pythonshell',
            pythonVersion: '3.9',
            scriptLocation: `s3://cm-da-uehara/glue-scripts/${glueSourceName}.py`,
        },
        executionProperty: {
            maxConcurrentRuns: 3,
        },
        defaultArguments: {
            '--job-language': 'python',
            '--extra-py-files': `s3://cm-da-uehara/glue-scripts-common/common-0.1-py3-none-any.whl`,
            '--additional-python-modules': 'pytz==2024.1',
            '--environment': JSON.stringify(environment),
        },
    });

    return glueJob;
}

ポイントは defaultArguments の記載になります。

--extra-py-files として、先にwheel化した common モジュールを指定しています。

また、LambdaではLayerとして作成していた pytz==2024.1 を --additional-python-modules として指定しています。
これにより、Glue起動時に pip でインストールされる形になります。

インターネットに接続できない環境にある、という場合は、やはり --additional-python-modulespip を利用する形でなく、wheel化をする必要が出てきますのでその点はご留意下さい。

また、Lambdaで環境変数として受け取っていた値についてもキー・バリューの値を JSON.stringify() をした上で --environment に渡しています。

lib/stepfunctions_with_glue.ts

stepfunctions_with_glue.ts は以下の通りです。

stepfunctions_with_glue.ts
import * as cdk from 'aws-cdk-lib';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import { Construct } from 'constructs';

export function createStepFunctionsWithGlue(
    scope: Construct,
    id: string,
    stateMachineRole: iam.Role,
    startGlue: cdk.aws_glue.CfnJob,
    endLambda: cdk.aws_lambda.Function,
): sfn.StateMachine {
    const retry: sfn.RetryProps = {
        errors: ['States.ALL'],
        maxAttempts: 2,
    }

    // Glue タスク
    const startTask = new tasks.GlueStartJobRun(scope, `${id}_start_task`, {
        glueJobName: startGlue.name!,
        integrationPattern: sfn.IntegrationPattern.RUN_JOB,
        arguments: sfn.TaskInput.fromObject({
            '--event': sfn.JsonPath.stringAt('States.JsonToString($)'),
            '--task_token': sfn.JsonPath.taskToken
        }),
        timeout: cdk.Duration.minutes(30),
    }).addRetry(retry)

    // Lambda タスク
    const endTask = new tasks.LambdaInvoke(scope, `${id}_end_task`, {
        lambdaFunction: endLambda,
        outputPath: '$.Payload',
    }).addRetry(retry);

    // ステートマシンの定義
    const definition = startTask.next(endTask);

    return new sfn.StateMachine(scope, id, {
        stateMachineName: `${id}`,
        definitionBody: sfn.DefinitionBody.fromChainable(definition),
        timeout: cdk.Duration.hours(1),
        role: stateMachineRole,
    });
}

移行前ではLambdaにしていた startTask の部分をGlueに置き換えています。

ここでもポイントは arguments 部分になります。

Lambdaでは event として取得できていたステートマシンからの入力を、 sfn.JsonPath.stringAt('States.JsonToString($)') という形にした上で --event として渡しています。

また、GlueはLambdaのようにスクリプトから return した値がステートマシンの出力とならない(つぎのステートに入力として渡すことができない)ので、 --task_token として 実行タスクのトークンを渡し、そちらに明示的に結果を返すようにします。

resources/glue/job_scripts/start_glue.py

移行前の start_handler.py に該当するのが start_glue.py となります。

start_glue.py
import json
import os
import sys

import boto3
from awsglue.utils import getResolvedOptions
from common import my_utils

SAVE_BUCKET_NAME = None

sfn_client = boto3.client("stepfunctions")

def get_task_token(args):
    for i in range(len(args)):
        if args[i] == "--task_token":
            return args[i + 1]
    raise Exception("task_tokenが見つかりませんでした")

def send_success_to_step_functions(task_token, output):
    response = sfn_client.send_task_success(taskToken=task_token, output=output)
    print(f"response: {response}")

def send_fail_to_step_functions(task_token, error=None, cause=None):
    response = sfn_client.send_task_failure(
        taskToken=task_token, error=error, cause=cause
    )
    print(f"response: {response}")

def main(argv):
    callback_token = get_task_token(argv)
    try:
        # argumentの取得
        args = getResolvedOptions(argv, ["event", "environment"])
        event = json.loads(args["event"])

        # 環境変数の設定
        env_dict = json.loads(args["environment"])
        for key, value in env_dict.items():
            os.environ[key.upper()] = str(value)
        global SAVE_BUCKET_NAME
        SAVE_BUCKET_NAME = os.getenv("SAVE_BUCKET_NAME", "")

        print(f"event: {json.dumps(event)}")
        name = event.get("name", "")

        hello_str = my_utils.get_hello_str(name)
        print(f"{hello_str}")

        s3_uri = f"s3://{SAVE_BUCKET_NAME}/test"

        output = {
            "result": "success",
            "s3_uri": s3_uri,
        }
        send_success_to_step_functions(callback_token, json.dumps(output))
    except Exception as e:
        error = type(e).__name__
        cause = str(e)
        send_fail_to_step_functions(callback_token, error=error, cause=cause)

if __name__ == "__main__":
    main(sys.argv)

引数として与えられた eventjson_loads() を利用してロードすることにより、Lambdaで event を取得したときと同じ状態にしています。

また、 event として渡された環境変数も os モジュールを使ってsetすることにより、Lambdaで環境変数を与えられた状態と同じ状態にしています。

これらは、既存のLambdaスクリプトの改修を最小限に抑える前処理となります。

Step Functionsへの出力は、 return ではなく boto3 のStep Functions クライアントにある send_task_success() や send_task_failure() を呼び出すことで実現しています。

AWSの公式ドキュメントにも記載されていますが、上記のように .sync ジョブであっても、途中でタスクトークンに対し SendTaskSuccessSendTaskFailure APIがコールされると、提供されたデータを使用してタスクの完了およびジョブの監視を停止し、ワークフローを続行します。

※ただ、Step FunctionsのAPIのコールそのものが失敗した場合は通常通りGlueのJob Runのエラーとなるため、その点はご留意下さい。

これにより、 Lambda で return で返していた出力を擬似的に再現しています。

Step FunctionsのAWSサービス毎の統合についてはこちらの公式ドキュメントをご確認下さい。

lib/cdk-my-sample-app-stack.ts

cdk-my-sample-app-stack.ts は以下です。

先のLambdaの構成に加え、Glueの記載を追記しています。

cdk-my-sample-app-stack.ts
import * as lambdaPython from '@aws-cdk/aws-lambda-python-alpha';
import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { Construct } from 'constructs';
import { createLambdaRole, createStepFunctionsRole, createGlueRole } from './iam_role';
import { createLambdaFunction } from './lambda';
import { createStepFunctions } from './stepfunctions_with_lambda';
import { createStepFunctionsWithGlue } from './stepfunctions_with_glue';
import { deployGlueJobScript, deployGlueCommonScript } from './glue_scripts';
import { createGlueJob } from './glue';

export class CdkMySampleAppStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // LambdaLayerの作成
    const baseLayer = new lambdaPython.PythonLayerVersion(this, 'BaseLayer', {
      entry: 'resources/lambda_layer',
      compatibleRuntimes: [lambda.Runtime.PYTHON_3_9],
      description: 'The layer containing Python dependencies',
    });
    const layers = [baseLayer]

    // Lambda IAM Role
    const lambdaRole = createLambdaRole(this, 'my-lambda-role');

    // 環境変数の定義
    const env_vars = {
      SAVE_BUCKET_NAME: 'uehara-test-bucket',
    }

    // Lambda Function
    const startLambda = createLambdaFunction(this, 'start-handler', lambdaRole, layers, env_vars);
    const endLambda = createLambdaFunction(this, 'end-handler', lambdaRole, layers, {})

    // Step Functions IAM Role
    const stepFunctionsRole = createStepFunctionsRole(this, 'my-sfn-role');

    // Step Functions with Lambda
    const stepFunctions = createStepFunctions(this, 'my-sfn-with-lambda', stepFunctionsRole, startLambda, endLambda);

    // Glueのスクリプトをデプロイ
    deployGlueCommonScript(this, 'deploy-glue-common-script')
    deployGlueJobScript(this, 'deploy-glue-job-script');

    // Glue IAM Role
    const glueRole = createGlueRole(this, 'my-glue-role');

    // Glue
    const startGlue = createGlueJob(this, 'start-glue', glueRole, env_vars);

    // StepFunctions with Glue
    const stepFunctions_with_glue
      = createStepFunctionsWithGlue(this, 'my-sfn-with-glue', stepFunctionsRole, startGlue, endLambda);
  }
}

デプロイしてみる

デプロイについては先と同様になります。

# デプロイ
$ cdk deploy CdkMySampleAppStack --profile <AWSプロファイル名>

Step Functionsの実行

デプロイが完了すると、1つ目のステートがLamdaからGlueになっているStep Functionsが作成されます。

2024-723_cdk_blog_05

作成されたStep Functionsに次のようなパラメータを渡し実行してみます。

{
  "name": "uehara"
}

最初のステートの出力は以下の通りです。

2024-723_cdk_blog_06

Lambda関数の構成と同様、Glueについても返り値として設定した results3_uri がステートの出力パラメータになっていることが分かります。

ステートの出力
{
  "result": "success",
  "s3_uri": "s3://uehara-test-bucket/test"
}

ログにも、きちんと期待された値が出力されていることが確認できました。

2024-723_cdk_blog_07

最後に

今回は、Step Functions内のLambdaをGlue (Python Shell)にAWS CDKを利用して置き換えてみました。

参考になりましたら幸いです。

参考文献

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.